﻿// ------------------------------------------------------------------------
// Copyright 2021 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------

namespace Dapr.Client;

using Crypto;
using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net.Http;
using System.Net.Http.Json;
using System.Runtime.CompilerServices;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Google.Protobuf;
using Google.Protobuf.WellKnownTypes;
using Grpc.Core;
using Grpc.Net.Client;
using System.Diagnostics.CodeAnalysis;
using Autogenerated = Dapr.Client.Autogen.Grpc.v1;

/// <summary>
/// A client for interacting with the Dapr endpoints.
/// </summary>
internal class DaprClientGrpc : DaprClient
{
    private const string AppIdKey = "appId";
    private const string MethodNameKey = "methodName";

    private readonly Uri httpEndpoint;
    private readonly HttpClient httpClient;

    private readonly JsonSerializerOptions jsonSerializerOptions;

    private readonly GrpcChannel channel;
    private readonly Autogenerated.Dapr.DaprClient client;
    private readonly KeyValuePair<string, string>? apiTokenHeader;

    // property exposed for testing purposes
    internal Autogenerated.Dapr.DaprClient Client => client;

    public override JsonSerializerOptions JsonSerializerOptions => jsonSerializerOptions;

    internal DaprClientGrpc(
        GrpcChannel channel,
        Autogenerated.Dapr.DaprClient inner,
        HttpClient httpClient,
        Uri httpEndpoint,
        JsonSerializerOptions jsonSerializerOptions,
        KeyValuePair<string, string>? apiTokenHeader)
    {
        this.channel = channel;
        this.client = inner;
        this.httpClient = httpClient;
        this.httpEndpoint = httpEndpoint;
        this.jsonSerializerOptions = jsonSerializerOptions;
        this.apiTokenHeader = apiTokenHeader;

        this.httpClient.DefaultRequestHeaders.UserAgent.Add(UserAgent());
    }

    #region Publish Apis

    /// <inheritdoc/>
    public override Task PublishEventAsync<TData>(
        string pubsubName,
        string topicName,
        TData data,
        CancellationToken cancellationToken = default)
    {
        ArgumentVerifier.ThrowIfNullOrEmpty(pubsubName, nameof(pubsubName));
        ArgumentVerifier.ThrowIfNullOrEmpty(topicName, nameof(topicName));
        ArgumentVerifier.ThrowIfNull(data, nameof(data));

        var content = TypeConverters.ToJsonByteString(data, this.JsonSerializerOptions);
        return MakePublishRequest(pubsubName, topicName, content, null,
            data is CloudEvent ? Constants.ContentTypeCloudEvent : null, cancellationToken);
    }

    public override Task PublishEventAsync<TData>(
        string pubsubName,
        string topicName,
        TData data,
        Dictionary<string, string> metadata,
        CancellationToken cancellationToken = default)
    {
        ArgumentVerifier.ThrowIfNullOrEmpty(pubsubName, nameof(pubsubName));
        ArgumentVerifier.ThrowIfNullOrEmpty(topicName, nameof(topicName));
        ArgumentVerifier.ThrowIfNull(data, nameof(data));
        ArgumentVerifier.ThrowIfNull(metadata, nameof(metadata));

        var content = TypeConverters.ToJsonByteString(data, this.JsonSerializerOptions);
        return MakePublishRequest(pubsubName, topicName, content, metadata,
            data is CloudEvent ? Constants.ContentTypeCloudEvent : null, cancellationToken);
    }

    /// <inheritdoc/>
    public override Task PublishEventAsync(
        string pubsubName,
        string topicName,
        CancellationToken cancellationToken = default)
    {
        ArgumentVerifier.ThrowIfNullOrEmpty(pubsubName, nameof(pubsubName));
        ArgumentVerifier.ThrowIfNullOrEmpty(topicName, nameof(topicName));
        return MakePublishRequest(pubsubName, topicName, null, null, null, cancellationToken);
    }

    public override Task PublishEventAsync(
        string pubsubName,
        string topicName,
        Dictionary<string, string> metadata,
        CancellationToken cancellationToken = default)
    {
        ArgumentVerifier.ThrowIfNullOrEmpty(pubsubName, nameof(pubsubName));
        ArgumentVerifier.ThrowIfNullOrEmpty(topicName, nameof(topicName));
        ArgumentVerifier.ThrowIfNull(metadata, nameof(metadata));
        return MakePublishRequest(pubsubName, topicName, null, metadata, null, cancellationToken);
    }

    public override Task PublishByteEventAsync(
        string pubsubName,
        string topicName,
        ReadOnlyMemory<byte> data,
        string dataContentType = Constants.ContentTypeApplicationJson,
        Dictionary<string, string> metadata = default,
        CancellationToken cancellationToken = default)
    {
        ArgumentVerifier.ThrowIfNullOrEmpty(pubsubName, nameof(pubsubName));
        ArgumentVerifier.ThrowIfNullOrEmpty(topicName, nameof(topicName));
        return MakePublishRequest(pubsubName, topicName, ByteString.CopyFrom(data.Span), metadata, dataContentType,
            cancellationToken);
    }

    private async Task MakePublishRequest(
        string pubsubName,
        string topicName,
        ByteString content,
        Dictionary<string, string> metadata,
        string dataContentType,
        CancellationToken cancellationToken)
    {
        var envelope = new Autogenerated.PublishEventRequest() { PubsubName = pubsubName, Topic = topicName, };

        if (content != null)
        {
            envelope.Data = content;
            envelope.DataContentType = dataContentType ?? Constants.ContentTypeApplicationJson;
        }

        if (metadata != null)
        {
            foreach (var kvp in metadata)
            {
                envelope.Metadata.Add(kvp.Key, kvp.Value);
            }
        }

        var options = CreateCallOptions(headers: null, cancellationToken);

        try
        {
            await client.PublishEventAsync(envelope, options);
        }
        catch (RpcException ex)
        {
            throw new DaprException(
                "Publish operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex);
        }
    }

    /// <inheritdoc/>
    public override Task<BulkPublishResponse<TValue>> BulkPublishEventAsync<TValue>(
        string pubsubName,
        string topicName,
        IReadOnlyList<TValue> events,
        Dictionary<string, string> metadata = default,
        CancellationToken cancellationToken = default)
    {
        ArgumentVerifier.ThrowIfNullOrEmpty(pubsubName, nameof(pubsubName));
        ArgumentVerifier.ThrowIfNullOrEmpty(topicName, nameof(topicName));
        ArgumentVerifier.ThrowIfNull(events, nameof(events));
        return MakeBulkPublishRequest(pubsubName, topicName, events, metadata, cancellationToken);
    }

    private async Task<BulkPublishResponse<TValue>> MakeBulkPublishRequest<TValue>(
        string pubsubName,
        string topicName,
        IReadOnlyList<TValue> events,
        Dictionary<string, string> metadata,
        CancellationToken cancellationToken)
    {
        var envelope = new Autogenerated.BulkPublishRequest() { PubsubName = pubsubName, Topic = topicName, };

        Dictionary<string, BulkPublishEntry<TValue>> entryMap = new Dictionary<string, BulkPublishEntry<TValue>>();

        for (var counter = 0; counter < events.Count; counter++)
        {
            var entry = new Autogenerated.BulkPublishRequestEntry()
            {
                EntryId = counter.ToString()               
            };

            if (events[counter] is byte[] bytes)
            {
                entry.Event = ByteString.CopyFrom(bytes);
                entry.ContentType = Constants.ContentTypeApplicationOctetStream;
            }
            else
            {
                entry.Event = TypeConverters.ToJsonByteString(events[counter], jsonSerializerOptions);
                entry.ContentType = events[counter] is CloudEvent ? Constants.ContentTypeCloudEvent : Constants.ContentTypeApplicationJson;
            }

            if (metadata != null)
            {
                entry.Metadata.Add(metadata);
            }

            envelope.Entries.Add(entry);
            entryMap.Add(counter.ToString(), new BulkPublishEntry<TValue>(
                entry.EntryId, events[counter], entry.ContentType, entry.Metadata));
        }

        if (metadata != null)
        {
            foreach (var kvp in metadata)
            {
                envelope.Metadata.Add(kvp.Key, kvp.Value);
            }
        }

        var options = CreateCallOptions(headers: null, cancellationToken);

        try
        {
            var response = await client.BulkPublishEventAlpha1Async(envelope, options);

            List<BulkPublishResponseFailedEntry<TValue>> failedEntries =
                new List<BulkPublishResponseFailedEntry<TValue>>();

            foreach (var entry in response.FailedEntries)
            {
                BulkPublishResponseFailedEntry<TValue> domainEntry = new BulkPublishResponseFailedEntry<TValue>(
                    entryMap[entry.EntryId], entry.Error);
                failedEntries.Add(domainEntry);
            }

            var bulkPublishResponse = new BulkPublishResponse<TValue>(failedEntries);

            return bulkPublishResponse;
        }
        catch (RpcException ex)
        {
            throw new DaprException("Bulk Publish operation failed: the Dapr endpoint indicated a " +
                                    "failure. See InnerException for details.", ex);
        }
    }

    #endregion

    #region InvokeBinding Apis

    /// <inheritdoc/>
    public override async Task InvokeBindingAsync<TRequest>(
        string bindingName,
        string operation,
        TRequest data,
        IReadOnlyDictionary<string, string> metadata = default,
        CancellationToken cancellationToken = default)
    {
        ArgumentVerifier.ThrowIfNullOrEmpty(bindingName, nameof(bindingName));
        ArgumentVerifier.ThrowIfNullOrEmpty(operation, nameof(operation));

        var bytes = TypeConverters.ToJsonByteString(data, this.jsonSerializerOptions);
        _ = await MakeInvokeBindingRequestAsync(bindingName, operation, bytes, metadata, cancellationToken);
    }

    /// <inheritdoc/>
    public override async Task<TResponse> InvokeBindingAsync<TRequest, TResponse>(
        string bindingName,
        string operation,
        TRequest data,
        IReadOnlyDictionary<string, string> metadata = default,
        CancellationToken cancellationToken = default)
    {
        ArgumentVerifier.ThrowIfNullOrEmpty(bindingName, nameof(bindingName));
        ArgumentVerifier.ThrowIfNullOrEmpty(operation, nameof(operation));

        var bytes = TypeConverters.ToJsonByteString(data, this.jsonSerializerOptions);
        var response = await MakeInvokeBindingRequestAsync(bindingName, operation, bytes, metadata, cancellationToken);

        try
        {
            return TypeConverters.FromJsonByteString<TResponse>(response.Data, this.JsonSerializerOptions);
        }
        catch (JsonException ex)
        {
            throw new DaprException(
                "Binding operation failed: the response payload could not be deserialized. See InnerException for details.",
                ex);
        }
    }

    public override async Task<BindingResponse> InvokeBindingAsync(BindingRequest request,
        CancellationToken cancellationToken = default)
    {
        var bytes = ByteString.CopyFrom(request.Data.Span);
        var response = await this.MakeInvokeBindingRequestAsync(request.BindingName, request.Operation, bytes,
            request.Metadata, cancellationToken);
        return new BindingResponse(request, response.Data.Memory, response.Metadata);
    }

    private async Task<Autogenerated.InvokeBindingResponse> MakeInvokeBindingRequestAsync(
        string name,
        string operation,
        ByteString data,
        IReadOnlyDictionary<string, string> metadata = default,
        CancellationToken cancellationToken = default)
    {
        var envelope = new Autogenerated.InvokeBindingRequest() { Name = name, Operation = operation };

        if (data != null)
        {
            envelope.Data = data;
        }

        if (metadata != null)
        {
            foreach (var kvp in metadata)
            {
                envelope.Metadata.Add(kvp.Key, kvp.Value);
            }
        }

        var options = CreateCallOptions(headers: null, cancellationToken);
        try
        {
            return await client.InvokeBindingAsync(envelope, options);
        }
        catch (RpcException ex)
        {
            throw new DaprException(
                "Binding operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex);
        }
    }

    #endregion

    #region InvokeMethod Apis

    /// <summary>
    /// Creates an <see cref="HttpRequestMessage" /> that can be used to perform service invocation for the
    /// application identified by <paramref name="appId" /> and invokes the method specified by <paramref name="methodName" />
    /// with the HTTP method specified by <paramref name="httpMethod" />.
    /// </summary>
    /// <param name="httpMethod">The <see cref="HttpMethod" /> to use for the invocation request.</param>
    /// <param name="appId">The Dapr application id to invoke the method on.</param>
    /// <param name="methodName">The name of the method to invoke.</param>
    /// <returns>An <see cref="HttpRequestMessage" /> for use with <c>SendInvokeMethodRequestAsync</c>.</returns>
    public override HttpRequestMessage CreateInvokeMethodRequest(HttpMethod httpMethod, string appId, string methodName)
    {
        return CreateInvokeMethodRequest(httpMethod, appId, methodName, new List<KeyValuePair<string, string>>());
    }

    /// <summary>
    /// Creates an <see cref="HttpRequestMessage" /> that can be used to perform service invocation for the
    /// application identified by <paramref name="appId" /> and invokes the method specified by <paramref name="methodName" />
    /// with the HTTP method specified by <paramref name="httpMethod" />.
    /// </summary>
    /// <param name="httpMethod">The <see cref="HttpMethod" /> to use for the invocation request.</param>
    /// <param name="appId">The Dapr application id to invoke the method on.</param>
    /// <param name="methodName">The name of the method to invoke.</param>
    /// <param name="queryStringParameters">A collection of key/value pairs to populate the query string from.</param>
    /// <returns>An <see cref="HttpRequestMessage" /> for use with <c>SendInvokeMethodRequestAsync</c>.</returns>
    public override HttpRequestMessage CreateInvokeMethodRequest(HttpMethod httpMethod, string appId, string methodName,
        IReadOnlyCollection<KeyValuePair<string, string>> queryStringParameters)
    {
        ArgumentVerifier.ThrowIfNull(httpMethod, nameof(httpMethod));
        ArgumentVerifier.ThrowIfNullOrEmpty(appId, nameof(appId));
        ArgumentVerifier.ThrowIfNull(methodName, nameof(methodName));

        // Note about this, it's possible to construct invalid stuff using path navigation operators
        // like `../..`. But the principle of garbage in -> garbage out holds.
        //
        // This approach avoids some common pitfalls that could lead to undesired encoding.
        var path = $"/v1.0/invoke/{appId}/method/{methodName.TrimStart('/')}";
        var requestUri = new Uri(this.httpEndpoint, path).AddQueryParameters(queryStringParameters);
        var request = new HttpRequestMessage(httpMethod, requestUri);

        request.Options.Set(new HttpRequestOptionsKey<string>(AppIdKey), appId);
        request.Options.Set(new HttpRequestOptionsKey<string>(MethodNameKey), methodName);

        if (this.apiTokenHeader is not null)
        {
            request.Headers.TryAddWithoutValidation(this.apiTokenHeader.Value.Key, this.apiTokenHeader.Value.Value);
        }

        return request;
    }

    /// <summary>
    /// Creates an <see cref="HttpRequestMessage" /> that can be used to perform service invocation for the
    /// application identified by <paramref name="appId" /> and invokes the method specified by <paramref name="methodName" />
    /// with the HTTP method specified by <paramref name="httpMethod" /> and a JSON serialized request body specified by 
    /// <paramref name="data" />.
    /// </summary>
    /// <typeparam name="TRequest">The type of the data that will be JSON serialized and provided as the request body.</typeparam>
    /// <param name="httpMethod">The <see cref="HttpMethod" /> to use for the invocation request.</param>
    /// <param name="appId">The Dapr application id to invoke the method on.</param>
    /// <param name="methodName">The name of the method to invoke.</param>
    /// <param name="data">The data that will be JSON serialized and provided as the request body.</param>
    /// <param name="queryStringParameters">A collection of key/value pairs to populate the query string from.</param>
    /// <returns>An <see cref="HttpRequestMessage" /> for use with <c>SendInvokeMethodRequestAsync</c>.</returns>
    public override HttpRequestMessage CreateInvokeMethodRequest<TRequest>(HttpMethod httpMethod, string appId,
        string methodName,
        IReadOnlyCollection<KeyValuePair<string, string>> queryStringParameters, TRequest data)
    {
        ArgumentVerifier.ThrowIfNull(httpMethod, nameof(httpMethod));
        ArgumentVerifier.ThrowIfNullOrEmpty(appId, nameof(appId));
        ArgumentVerifier.ThrowIfNull(methodName, nameof(methodName));

        var request = CreateInvokeMethodRequest(httpMethod, appId, methodName, queryStringParameters);
        request.Content = JsonContent.Create(data, options: this.JsonSerializerOptions);
        return request;
    }

    public override async Task<HttpResponseMessage> InvokeMethodWithResponseAsync(HttpRequestMessage request,
        CancellationToken cancellationToken = default)
    {
        ArgumentVerifier.ThrowIfNull(request, nameof(request));

        if (request.RequestUri != null && !this.httpEndpoint.IsBaseOf(request.RequestUri))
        {
            throw new InvalidOperationException("The provided request URI is not a Dapr service invocation URI.");
        }

        // Note: we intentionally DO NOT validate the status code here.
        // This method allows you to 'invoke' without exceptions on non-2xx.
        try
        {
            return await this.httpClient.SendAsync(request, cancellationToken);
        }
        catch (HttpRequestException ex)
        {
            // Our code path for creating requests places these keys in the request properties. We don't want to fail
            // if they are not present.
            request.Options.TryGetValue(new HttpRequestOptionsKey<string>(AppIdKey), out var appId);
            request.Options.TryGetValue(new HttpRequestOptionsKey<string>(MethodNameKey), out var methodName);

            throw new InvocationException(
                appId: appId,
                methodName: methodName,
                innerException: ex,
                response: null);
        }
    }

    /// <summary>
    /// <para>
    /// Creates an <see cref="HttpClient"/> that can be used to perform Dapr service invocation using <see cref="HttpRequestMessage"/>
    /// objects.
    /// </para>
    /// <para>
    /// The client will read the <see cref="HttpRequestMessage.RequestUri" /> property, and 
    /// interpret the hostname as the destination <c>app-id</c>. The <see cref="HttpRequestMessage.RequestUri" /> 
    /// property will be replaced with a new URI with the authority section replaced by the instance's <see cref="httpEndpoint"/> value
    /// and the path portion of the URI rewritten to follow the format of a Dapr service invocation request.
    /// </para>
    /// </summary>
    /// <param name="appId">
    ///     An optional <c>app-id</c>. If specified, the <c>app-id</c> will be configured as the value of 
    ///     <see cref="HttpClient.BaseAddress" /> so that relative URIs can be used. It is mandatory to set this parameter if your app-id contains at least one upper letter.
    ///     If some requests use absolute URL with an app-id which contains at least one upper letter, it will not work, the workaround is to create one HttpClient for each app-id with the app-ip parameter set.
    /// </param>
    /// <returns>An <see cref="HttpClient" /> that can be used to perform service invocation requests.</returns>
    /// <remarks>
    /// </remarks>
#nullable enable
    public override HttpClient CreateInvokableHttpClient(string? appId = null) =>
        DaprClient.CreateInvokeHttpClient(appId, this.httpEndpoint?.AbsoluteUri, this.apiTokenHeader?.Value);
#nullable disable

    public async override Task InvokeMethodAsync(HttpRequestMessage request,
        CancellationToken cancellationToken = default)
    {
        ArgumentVerifier.ThrowIfNull(request, nameof(request));

        var response = await InvokeMethodWithResponseAsync(request, cancellationToken);
        try
        {
            response.EnsureSuccessStatusCode();
        }
        catch (HttpRequestException ex)
        {
            // Our code path for creating requests places these keys in the request properties. We don't want to fail
            // if they are not present.
            request.Options.TryGetValue(new HttpRequestOptionsKey<string>(AppIdKey), out var appId);
            request.Options.TryGetValue(new HttpRequestOptionsKey<string>(MethodNameKey), out var methodName);

            throw new InvocationException(
                appId: appId,
                methodName: methodName,
                innerException: ex,
                response: response);
        }
    }

    public async override Task<TResponse> InvokeMethodAsync<TResponse>(HttpRequestMessage request,
        CancellationToken cancellationToken = default)
    {
        ArgumentVerifier.ThrowIfNull(request, nameof(request));

        var response = await InvokeMethodWithResponseAsync(request, cancellationToken);
        try
        {
            response.EnsureSuccessStatusCode();
        }
        catch (HttpRequestException ex)
        {
            // Our code path for creating requests places these keys in the request properties. We don't want to fail
            // if they are not present.
            request.Options.TryGetValue(new HttpRequestOptionsKey<string>(AppIdKey), out var appId);
            request.Options.TryGetValue(new HttpRequestOptionsKey<string>(MethodNameKey), out var methodName);

            throw new InvocationException(
                appId: appId,
                methodName: methodName,
                innerException: ex,
                response: response);
        }

        try
        {
            return await response.Content.ReadFromJsonAsync<TResponse>(this.jsonSerializerOptions, cancellationToken);
        }
        catch (HttpRequestException ex)
        {
            // Our code path for creating requests places these keys in the request properties. We don't want to fail
            // if they are not present.
            request.Options.TryGetValue(new HttpRequestOptionsKey<string>(AppIdKey), out var appId);
            request.Options.TryGetValue(new HttpRequestOptionsKey<string>(MethodNameKey), out var methodName);

            throw new InvocationException(
                appId: appId,
                methodName: methodName,
                innerException: ex,
                response: response);
        }
        catch (JsonException ex)
        {
            request.Options.TryGetValue(new HttpRequestOptionsKey<string>(AppIdKey), out var appId);
            request.Options.TryGetValue(new HttpRequestOptionsKey<string>(MethodNameKey), out var methodName);

            throw new InvocationException(
                appId: appId,
                methodName: methodName,
                innerException: ex,
                response: response);
        }
    }

    public override async Task InvokeMethodGrpcAsync(string appId, string methodName,
        CancellationToken cancellationToken = default)
    {
        ArgumentVerifier.ThrowIfNullOrEmpty(appId, nameof(appId));
        ArgumentVerifier.ThrowIfNullOrEmpty(methodName, nameof(methodName));

        var envelope = new Autogenerated.InvokeServiceRequest()
        {
            Id = appId, Message = new Autogenerated.InvokeRequest() { Method = methodName, },
        };

        var options = CreateCallOptions(headers: null, cancellationToken);

        try
        {
            _ = await this.Client.InvokeServiceAsync(envelope, options);
        }
        catch (RpcException ex)
        {
            throw new InvocationException(appId, methodName, ex);
        }
    }

    public override async Task InvokeMethodGrpcAsync<TRequest>(string appId, string methodName, TRequest data,
        CancellationToken cancellationToken = default)
    {
        ArgumentVerifier.ThrowIfNullOrEmpty(appId, nameof(appId));
        ArgumentVerifier.ThrowIfNullOrEmpty(methodName, nameof(methodName));

        var envelope = new Autogenerated.InvokeServiceRequest()
        {
            Id = appId,
            Message = new Autogenerated.InvokeRequest()
            {
                Method = methodName, ContentType = Constants.ContentTypeApplicationGrpc, Data = Any.Pack(data),
            },
        };

        var options = CreateCallOptions(headers: null, cancellationToken);

        try
        {
            _ = await this.Client.InvokeServiceAsync(envelope, options);
        }

        catch (RpcException ex)
        {
            throw new InvocationException(appId, methodName, ex);
        }
    }

    public override async Task<TResponse> InvokeMethodGrpcAsync<TResponse>(string appId, string methodName,
        CancellationToken cancellationToken = default)
    {
        ArgumentVerifier.ThrowIfNullOrEmpty(appId, nameof(appId));
        ArgumentVerifier.ThrowIfNullOrEmpty(methodName, nameof(methodName));

        var envelope = new Autogenerated.InvokeServiceRequest()
        {
            Id = appId, Message = new Autogenerated.InvokeRequest() { Method = methodName, },
        };

        var options = CreateCallOptions(headers: null, cancellationToken);

        try
        {
            var response = await this.Client.InvokeServiceAsync(envelope, options);
            return response.Data.Unpack<TResponse>();
        }
        catch (RpcException ex)
        {
            throw new InvocationException(appId, methodName, ex);
        }
    }

    public override async Task<TResponse> InvokeMethodGrpcAsync<TRequest, TResponse>(string appId, string methodName,
        TRequest data, CancellationToken cancellationToken = default)
    {
        ArgumentVerifier.ThrowIfNullOrEmpty(appId, nameof(appId));
        ArgumentVerifier.ThrowIfNullOrEmpty(methodName, nameof(methodName));

        var envelope = new Autogenerated.InvokeServiceRequest()
        {
            Id = appId,
            Message = new Autogenerated.InvokeRequest()
            {
                Method = methodName, ContentType = Constants.ContentTypeApplicationGrpc, Data = Any.Pack(data),
            },
        };

        var options = CreateCallOptions(headers: null, cancellationToken);

        try
        {
            var response = await this.Client.InvokeServiceAsync(envelope, options);
            return response.Data.Unpack<TResponse>();
        }
        catch (RpcException ex)
        {
            throw new InvocationException(appId, methodName, ex);
        }
    }

    #endregion

    #region State Apis

    /// <inheritdoc />
    public override async Task<IReadOnlyList<BulkStateItem>> GetBulkStateAsync(string storeName,
        IReadOnlyList<string> keys, int? parallelism, IReadOnlyDictionary<string, string> metadata = default,
        CancellationToken cancellationToken = default)
    {
        var rawBulkState = await GetBulkStateRawAsync(storeName, keys, parallelism, metadata, cancellationToken);

        var bulkResponse = new List<BulkStateItem>();
        foreach (var item in rawBulkState)
        {
            bulkResponse.Add(new BulkStateItem(item.Key, item.Value.ToStringUtf8(), item.Etag));
        }

        return bulkResponse;
    }

    /// <inheritdoc/> 
    public override async Task<IReadOnlyList<BulkStateItem<TValue>>> GetBulkStateAsync<TValue>(
        string storeName,
        IReadOnlyList<string> keys,
        int? parallelism,
        IReadOnlyDictionary<string, string> metadata = default,
        CancellationToken cancellationToken = default)
    {
        var rawBulkState = await GetBulkStateRawAsync(storeName, keys, parallelism, metadata, cancellationToken);

        var bulkResponse = new List<BulkStateItem<TValue>>();
        foreach (var item in rawBulkState)
        {
            var deserializedValue = TypeConverters.FromJsonByteString<TValue>(item.Value, this.JsonSerializerOptions);
            bulkResponse.Add(new BulkStateItem<TValue>(item.Key, deserializedValue, item.Etag));
        }

        return bulkResponse;
    }

    /// <summary>
    /// Retrieves the bulk state data, but rather than deserializing the values, leaves the specific handling
    /// to the public callers of this method to avoid duplicate deserialization.
    /// </summary>
    private async Task<IReadOnlyList<(string Key, string Etag, ByteString Value)>> GetBulkStateRawAsync(
        string storeName,
        IReadOnlyList<string> keys,
        int? parallelism,
        IReadOnlyDictionary<string, string> metadata = default,
        CancellationToken cancellationToken = default)
    {
        ArgumentVerifier.ThrowIfNullOrEmpty(storeName, nameof(storeName));
        if (keys.Count == 0)
            throw new ArgumentException("keys do not contain any elements");

        var envelope = new Autogenerated.GetBulkStateRequest()
        {
            StoreName = storeName, Parallelism = parallelism ?? default
        };

        if (metadata != null)
        {
            foreach (var kvp in metadata)
            {
                envelope.Metadata.Add(kvp.Key, kvp.Value);
            }
        }

        envelope.Keys.AddRange(keys);

        var options = CreateCallOptions(headers: null, cancellationToken);
        Autogenerated.GetBulkStateResponse response;

        try
        {
            response = await client.GetBulkStateAsync(envelope, options);
        }
        catch (RpcException ex)
        {
            throw new DaprException(
                "State operation failed: the Dapr endpoint indicated a failure. See InnerException for details.",
                ex);
        }

        var bulkResponse = new List<(string Key, string Etag, ByteString Value)>();
        foreach (var item in response.Items)
        {
            bulkResponse.Add((item.Key, item.Etag, item.Data));
        }

        return bulkResponse;
    }

    /// <inheritdoc/>
    public override async Task<TValue> GetStateAsync<TValue>(
        string storeName,
        string key,
        ConsistencyMode? consistencyMode = default,
        IReadOnlyDictionary<string, string> metadata = default,
        CancellationToken cancellationToken = default)
    {
        ArgumentVerifier.ThrowIfNullOrEmpty(storeName, nameof(storeName));
        ArgumentVerifier.ThrowIfNullOrEmpty(key, nameof(key));

        var envelope = new Autogenerated.GetStateRequest() { StoreName = storeName, Key = key, };

        if (metadata != null)
        {
            foreach (var kvp in metadata)
            {
                envelope.Metadata.Add(kvp.Key, kvp.Value);
            }
        }

        if (consistencyMode != null)
        {
            envelope.Consistency = GetStateConsistencyForConsistencyMode(consistencyMode.Value);
        }

        var options = CreateCallOptions(headers: null, cancellationToken);
        Autogenerated.GetStateResponse response;

        try
        {
            response = await client.GetStateAsync(envelope, options);
        }
        catch (RpcException ex)
        {
            throw new DaprException(
                "State operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex);
        }

        try
        {
            return TypeConverters.FromJsonByteString<TValue>(response.Data, this.JsonSerializerOptions);
        }
        catch (JsonException ex)
        {
            throw new DaprException(
                "State operation failed: the state payload could not be deserialized. See InnerException for details.",
                ex);
        }
    }

    /// <inheritdoc />
    public override async Task SaveBulkStateAsync<TValue>(string storeName, IReadOnlyList<SaveStateItem<TValue>> items,
        CancellationToken cancellationToken = default)
    {
        ArgumentVerifier.ThrowIfNullOrEmpty(storeName, nameof(storeName));

        if (items.Count == 0)
        {
            throw new ArgumentException("items do not contain any elements");
        }

        var envelope = new Autogenerated.SaveStateRequest() { StoreName = storeName, };

        foreach (var item in items)
        {
            var stateItem = new Autogenerated.StateItem() { Key = item.Key, };

            if (item.ETag != null)
            {
                stateItem.Etag = new Autogenerated.Etag() { Value = item.ETag };
            }

            if (item.Metadata != null)
            {
                foreach (var kvp in item.Metadata)
                {
                    stateItem.Metadata.Add(kvp.Key, kvp.Value);
                }
            }

            if (item.StateOptions != null)
            {
                stateItem.Options = ToAutoGeneratedStateOptions(item.StateOptions);
            }

            if (item.Value != null)
            {
                stateItem.Value = TypeConverters.ToJsonByteString(item.Value, this.jsonSerializerOptions);
            }

            envelope.States.Add(stateItem);
        }

        try
        {
            await this.Client.SaveStateAsync(envelope, cancellationToken: cancellationToken);
        }
        catch (RpcException ex)
        {
            throw new DaprException(
                "State operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex);
        }
    }

    /// <inheritdoc/>
    public override async Task SaveByteStateAsync(
        string storeName,
        string key,
        ReadOnlyMemory<byte> value,
        StateOptions stateOptions = default,
        IReadOnlyDictionary<string, string> metadata = default,
        CancellationToken cancellationToken = default)
    {
        ArgumentVerifier.ThrowIfNullOrEmpty(storeName, nameof(storeName));
        ArgumentVerifier.ThrowIfNullOrEmpty(key, nameof(key));
        _ = await this.MakeSaveByteStateCallAsync(
            storeName,
            key,
            ByteString.CopyFrom(value.Span),
            etag: null,
            stateOptions,
            metadata,
            cancellationToken);
    }

    /// <inheritdoc/>
    public override async Task<bool> TrySaveByteStateAsync(
        string storeName,
        string key,
        ReadOnlyMemory<byte> value,
        string etag,
        StateOptions stateOptions = default,
        IReadOnlyDictionary<string, string> metadata = default,
        CancellationToken cancellationToken = default)
    {
        ArgumentVerifier.ThrowIfNullOrEmpty(storeName, nameof(storeName));
        ArgumentVerifier.ThrowIfNullOrEmpty(key, nameof(key));
        ArgumentVerifier.ThrowIfNull(etag, nameof(etag));
        return await this.MakeSaveByteStateCallAsync(storeName, key, ByteString.CopyFrom(value.Span), etag,
            stateOptions, metadata, cancellationToken);
    }

    // Method MakeSaveStateCallAsync to save binary value
    private async Task<bool> MakeSaveByteStateCallAsync(
        string storeName,
        string key,
        ByteString value,
        string etag = default,
        StateOptions stateOptions = default,
        IReadOnlyDictionary<string, string> metadata = default,
        CancellationToken cancellationToken = default)
    {
        var envelope = new Autogenerated.SaveStateRequest() { StoreName = storeName, };


        var stateItem = new Autogenerated.StateItem() { Key = key, };

        if (metadata != null)
        {
            foreach (var kvp in metadata)
            {
                stateItem.Metadata.Add(kvp.Key, kvp.Value);
            }
        }

        if (etag != null)
        {
            stateItem.Etag = new Autogenerated.Etag() { Value = etag };
        }

        if (stateOptions != null)
        {
            stateItem.Options = ToAutoGeneratedStateOptions(stateOptions);
        }

        if (value != null)
        {

            stateItem.Value = value;
        }

        envelope.States.Add(stateItem);

        var options = CreateCallOptions(headers: null, cancellationToken);
        try
        {
            await client.SaveStateAsync(envelope, options);
            return true;
        }
        catch (RpcException rpc) when (etag != null && rpc.StatusCode == StatusCode.Aborted)
        {
            // This kind of failure indicates an ETag mismatch. Aborted doesn't seem like
            // the right status code at first, but check the docs, it fits this use-case.
            //
            // When an ETag is used we surface this though the Try... pattern
            return false;
        }
        catch (RpcException ex)
        {
            throw new DaprException(
                "State operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex);
        }
    }

    /// <inheritdoc/>
    public override async Task<ReadOnlyMemory<byte>> GetByteStateAsync(
        string storeName,
        string key,
        ConsistencyMode? consistencyMode = default,
        IReadOnlyDictionary<string, string> metadata = default,
        CancellationToken cancellationToken = default)
    {
        ArgumentVerifier.ThrowIfNullOrEmpty(storeName, nameof(storeName));
        ArgumentVerifier.ThrowIfNullOrEmpty(key, nameof(key));
        var envelope = new Autogenerated.GetStateRequest() { StoreName = storeName, Key = key, };
        if (consistencyMode != null)
        {
            envelope.Consistency = GetStateConsistencyForConsistencyMode(consistencyMode.Value);
        }

        if (metadata != null)
        {
            foreach (var kvp in metadata)
            {
                envelope.Metadata.Add(kvp.Key, kvp.Value);
            }
        }

        var options = CreateCallOptions(headers: null, cancellationToken);
        try
        {
            var response = await client.GetStateAsync(envelope, options);
            return response.Data.ToByteArray().AsMemory();
        }
        catch (RpcException ex)
        {
            throw new DaprException(
                "State operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex);
        }
    }

    /// <inheritdoc/>
    public override async Task<(ReadOnlyMemory<byte>, string etag)> GetByteStateAndETagAsync(
        string storeName,
        string key,
        ConsistencyMode? consistencyMode = default,
        IReadOnlyDictionary<string, string> metadata = default,
        CancellationToken cancellationToken = default)
    {
        ArgumentVerifier.ThrowIfNullOrEmpty(storeName, nameof(storeName));
        ArgumentVerifier.ThrowIfNullOrEmpty(key, nameof(key));

        var envelope = new Autogenerated.GetStateRequest() { StoreName = storeName, Key = key };

        if (metadata != null)
        {
            foreach (var kvp in metadata)
            {
                envelope.Metadata.Add(kvp.Key, kvp.Value);
            }
        }

        if (consistencyMode != null)
        {
            envelope.Consistency = GetStateConsistencyForConsistencyMode(consistencyMode.Value);
        }

        var options = CreateCallOptions(headers: null, cancellationToken);
        Autogenerated.GetStateResponse response;

        try
        {
            response = await client.GetStateAsync(envelope, options);
        }
        catch (RpcException ex)
        {
            throw new DaprException(
                "State operation failed: the Dapr endpoint indicated a failure. See InnerException for details.",
                ex);
        }

        try
        {
            return (response.Data.ToByteArray().AsMemory(), response.Etag);
        }
        catch (JsonException ex)
        {
            throw new DaprException(
                "State operation failed: the state payload could not be deserialized. See InnerException for details.",
                ex);
        }
    }

    /// <inheritdoc />
    public override async Task DeleteBulkStateAsync(string storeName, IReadOnlyList<BulkDeleteStateItem> items,
        CancellationToken cancellationToken = default)
    {
        var envelope = new Autogenerated.DeleteBulkStateRequest() { StoreName = storeName, };

        foreach (var item in items)
        {
            var stateItem = new Autogenerated.StateItem() { Key = item.Key, };

            if (item.ETag != null)
            {
                stateItem.Etag = new Autogenerated.Etag() { Value = item.ETag };
            }

            if (item.Metadata != null)
            {
                foreach (var kvp in item.Metadata)
                {
                    stateItem.Metadata.Add(kvp.Key, kvp.Value);
                }
            }

            if (item.StateOptions != null)
            {
                stateItem.Options = ToAutoGeneratedStateOptions(item.StateOptions);
            }

            envelope.States.Add(stateItem);
        }

        try
        {
            await this.Client.DeleteBulkStateAsync(envelope, cancellationToken: cancellationToken);
        }
        catch (RpcException ex)
        {
            throw new DaprException(
                "State operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex);
        }

    }

    /// <inheritdoc/>
    public override async Task<(TValue value, string etag)> GetStateAndETagAsync<TValue>(
        string storeName,
        string key,
        ConsistencyMode? consistencyMode = null,
        IReadOnlyDictionary<string, string> metadata = default,
        CancellationToken cancellationToken = default)
    {
        ArgumentVerifier.ThrowIfNullOrEmpty(storeName, nameof(storeName));
        ArgumentVerifier.ThrowIfNullOrEmpty(key, nameof(key));

        var envelope = new Autogenerated.GetStateRequest() { StoreName = storeName, Key = key };

        if (metadata != null)
        {
            foreach (var kvp in metadata)
            {
                envelope.Metadata.Add(kvp.Key, kvp.Value);
            }
        }

        if (consistencyMode != null)
        {
            envelope.Consistency = GetStateConsistencyForConsistencyMode(consistencyMode.Value);
        }

        var options = CreateCallOptions(headers: null, cancellationToken);
        Autogenerated.GetStateResponse response;

        try
        {
            response = await client.GetStateAsync(envelope, options);
        }
        catch (RpcException ex)
        {
            throw new DaprException(
                "State operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex);
        }

        try
        {
            return (TypeConverters.FromJsonByteString<TValue>(response.Data, this.JsonSerializerOptions),
                response.Etag);
        }
        catch (JsonException ex)
        {
            throw new DaprException(
                "State operation failed: the state payload could not be deserialized. See InnerException for details.",
                ex);
        }
    }

    /// <inheritdoc/>
    public override async Task SaveStateAsync<TValue>(
        string storeName,
        string key,
        TValue value,
        StateOptions stateOptions = default,
        IReadOnlyDictionary<string, string> metadata = default,
        CancellationToken cancellationToken = default)
    {
        ArgumentVerifier.ThrowIfNullOrEmpty(storeName, nameof(storeName));
        ArgumentVerifier.ThrowIfNullOrEmpty(key, nameof(key));

        _ = await this.MakeSaveStateCallAsync(
            storeName,
            key,
            value,
            etag: null,
            stateOptions,
            metadata,
            cancellationToken);
    }

    /// <inheritdoc/>
    public override async Task<bool> TrySaveStateAsync<TValue>(
        string storeName,
        string key,
        TValue value,
        string etag,
        StateOptions stateOptions = default,
        IReadOnlyDictionary<string, string> metadata = default,
        CancellationToken cancellationToken = default)
    {
        ArgumentVerifier.ThrowIfNullOrEmpty(storeName, nameof(storeName));
        ArgumentVerifier.ThrowIfNullOrEmpty(key, nameof(key));
        // Not all state stores treat empty etag as invalid. Therefore, we will not verify an empty etag and
        // rely on bubbling up the error if any from Dapr runtime
        ArgumentVerifier.ThrowIfNull(etag, nameof(etag));

        return await this.MakeSaveStateCallAsync(storeName, key, value, etag, stateOptions, metadata,
            cancellationToken);
    }

    private async Task<bool> MakeSaveStateCallAsync<TValue>(
        string storeName,
        string key,
        TValue value,
        string etag = default,
        StateOptions stateOptions = default,
        IReadOnlyDictionary<string, string> metadata = default,
        CancellationToken cancellationToken = default)
    {
        var envelope = new Autogenerated.SaveStateRequest() { StoreName = storeName, };


        var stateItem = new Autogenerated.StateItem() { Key = key, };

        if (metadata != null)
        {
            foreach (var kvp in metadata)
            {
                stateItem.Metadata.Add(kvp.Key, kvp.Value);
            }
        }

        if (etag != null)
        {
            stateItem.Etag = new Autogenerated.Etag() { Value = etag };
        }

        if (stateOptions != null)
        {
            stateItem.Options = ToAutoGeneratedStateOptions(stateOptions);
        }

        if (value != null)
        {
            stateItem.Value = TypeConverters.ToJsonByteString(value, this.jsonSerializerOptions);
        }

        envelope.States.Add(stateItem);

        var options = CreateCallOptions(headers: null, cancellationToken);
        try
        {
            await client.SaveStateAsync(envelope, options);
            return true;
        }
        catch (RpcException rpc) when (etag != null && rpc.StatusCode == StatusCode.Aborted)
        {
            // This kind of failure indicates an ETag mismatch. Aborted doesn't seem like
            // the right status code at first, but check the docs, it fits this use-case.
            //
            // When an ETag is used we surface this though the Try... pattern
            return false;
        }
        catch (RpcException ex)
        {
            throw new DaprException(
                "State operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex);
        }
    }


    /// <inheritdoc/>
    public override async Task ExecuteStateTransactionAsync(
        string storeName,
        IReadOnlyList<StateTransactionRequest> operations,
        IReadOnlyDictionary<string, string> metadata = default,
        CancellationToken cancellationToken = default)
    {
        ArgumentVerifier.ThrowIfNullOrEmpty(storeName, nameof(storeName));
        ArgumentVerifier.ThrowIfNull(operations, nameof(operations));
        if (operations.Count == 0)
        {
            throw new ArgumentException($"{nameof(operations)} does not contain any elements");
        }

        await this.MakeExecuteStateTransactionCallAsync(
            storeName,
            operations,
            metadata,
            cancellationToken);
    }

    private async Task MakeExecuteStateTransactionCallAsync(
        string storeName,
        IReadOnlyList<StateTransactionRequest> states,
        IReadOnlyDictionary<string, string> metadata = default,
        CancellationToken cancellationToken = default)
    {
        var envelope = new Autogenerated.ExecuteStateTransactionRequest() { StoreName = storeName, };

        foreach (var state in states)
        {
            var stateOperation = new Autogenerated.TransactionalStateOperation
            {
                OperationType = state.OperationType.ToString()?.ToLower(), Request = ToAutogeneratedStateItem(state)
            };

            envelope.Operations.Add(stateOperation);

        }

        // Add metadata that applies to all operations if specified
        if (metadata != null)
        {
            foreach (var kvp in metadata)
            {
                envelope.Metadata.Add(kvp.Key, kvp.Value);
            }
        }

        var options = CreateCallOptions(headers: null, cancellationToken);
        try
        {
            await client.ExecuteStateTransactionAsync(envelope, options);
        }
        catch (RpcException ex)
        {
            throw new DaprException(
                "State operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex);
        }
    }

    private Autogenerated.StateItem ToAutogeneratedStateItem(StateTransactionRequest state)
    {
        var stateOperation = new Autogenerated.StateItem { Key = state.Key };

        if (state.Value != null)
        {
            stateOperation.Value = ByteString.CopyFrom(state.Value);
        }

        if (state.ETag != null)
        {
            stateOperation.Etag = new Autogenerated.Etag() { Value = state.ETag };
        }

        if (state.Metadata != null)
        {
            foreach (var kvp in state.Metadata)
            {
                stateOperation.Metadata.Add(kvp.Key, kvp.Value);
            }
        }

        if (state.Options != null)
        {
            stateOperation.Options = ToAutoGeneratedStateOptions(state.Options);
        }

        return stateOperation;
    }

    /// <inheritdoc/>
    public override async Task DeleteStateAsync(
        string storeName,
        string key,
        StateOptions stateOptions = default,
        IReadOnlyDictionary<string, string> metadata = default,
        CancellationToken cancellationToken = default)
    {
        ArgumentVerifier.ThrowIfNullOrEmpty(storeName, nameof(storeName));
        ArgumentVerifier.ThrowIfNullOrEmpty(key, nameof(key));

        _ = await this.MakeDeleteStateCallAsync(
            storeName,
            key,
            etag: null,
            stateOptions,
            metadata,
            cancellationToken);
    }

    /// <inheritdoc/>
    public override async Task<bool> TryDeleteStateAsync(
        string storeName,
        string key,
        string etag,
        StateOptions stateOptions = default,
        IReadOnlyDictionary<string, string> metadata = default,
        CancellationToken cancellationToken = default)
    {
        ArgumentVerifier.ThrowIfNullOrEmpty(storeName, nameof(storeName));
        ArgumentVerifier.ThrowIfNullOrEmpty(key, nameof(key));
        // Not all state stores treat empty etag as invalid. Therefore, we will not verify an empty etag and
        // rely on bubbling up the error if any from Dapr runtime
        ArgumentVerifier.ThrowIfNull(etag, nameof(etag));

        return await this.MakeDeleteStateCallAsync(storeName, key, etag, stateOptions, metadata, cancellationToken);
    }

    private async Task<bool> MakeDeleteStateCallAsync(
        string storeName,
        string key,
        string etag = default,
        StateOptions stateOptions = default,
        IReadOnlyDictionary<string, string> metadata = default,
        CancellationToken cancellationToken = default)
    {
        var deleteStateEnvelope = new Autogenerated.DeleteStateRequest() { StoreName = storeName, Key = key, };

        if (metadata != null)
        {
            foreach (var kvp in metadata)
            {
                deleteStateEnvelope.Metadata.Add(kvp.Key, kvp.Value);
            }
        }

        if (etag != null)
        {
            deleteStateEnvelope.Etag = new Autogenerated.Etag() { Value = etag };
        }

        if (stateOptions != null)
        {
            deleteStateEnvelope.Options = ToAutoGeneratedStateOptions(stateOptions);
        }

        var options = CreateCallOptions(headers: null, cancellationToken);

        try
        {
            await client.DeleteStateAsync(deleteStateEnvelope, options);
            return true;
        }
        catch (RpcException rpc) when (etag != null && rpc.StatusCode == StatusCode.Aborted)
        {
            // This kind of failure indicates an ETag mismatch. Aborted doesn't seem like
            // the right status code at first, but check the docs, it fits this use-case.
            //
            // When an ETag is used we surface this though the Try... pattern
            return false;
        }
        catch (RpcException ex)
        {
            throw new DaprException(
                "State operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex);
        }
    }

    /// <inheritdoc/>
    public async override Task<StateQueryResponse<TValue>> QueryStateAsync<TValue>(
        string storeName,
        string jsonQuery,
        IReadOnlyDictionary<string, string> metadata = default,
        CancellationToken cancellationToken = default)
    {
        var queryRequest = new Autogenerated.QueryStateRequest() { StoreName = storeName, Query = jsonQuery };

        if (metadata != null)
        {
            foreach (var kvp in metadata)
            {
                queryRequest.Metadata.Add(kvp.Key, kvp.Value);
            }
        }

        var options = CreateCallOptions(headers: null, cancellationToken);

        try
        {
            var items = new List<StateQueryItem<TValue>>();
            var failedKeys = new List<string>();
            var queryResponse = await client.QueryStateAlpha1Async(queryRequest, options);
            foreach (var item in queryResponse.Results)
            {
                if (!string.IsNullOrEmpty(item.Error))
                {
                    // When we encounter an error, we record the key and prepare to throw an exception at the end of the results.
                    failedKeys.Add(item.Key);
                    continue;
                }

                items.Add(new StateQueryItem<TValue>(item.Key,
                    TypeConverters.FromJsonByteString<TValue>(item.Data, this.JsonSerializerOptions), item.Etag,
                    item.Error));
            }

            var results = new StateQueryResponse<TValue>(items, queryResponse.Token, queryResponse.Metadata);
            if (failedKeys.Count > 0)
            {
                // We encountered some bad keys so we throw instead of returning to alert the user.
                throw new StateQueryException<TValue>($"Encountered an error while processing state query results.",
                    results, failedKeys);
            }

            return results;
        }
        catch (RpcException ex)
        {
            throw new DaprException(
                "Query state operation failed: the Dapr endpointed indicated a failure. See InnerException for details.",
                ex);
        }
        catch (JsonException ex)
        {
            throw new DaprException(
                "State operation failed: the state payload could not be deserialized. See InnerException for details.",
                ex);
        }
    }

    #endregion

    #region Secret Apis

    /// <inheritdoc/>
    public async override Task<Dictionary<string, string>> GetSecretAsync(
        string storeName,
        string key,
        IReadOnlyDictionary<string, string> metadata = default,
        CancellationToken cancellationToken = default)
    {
        ArgumentVerifier.ThrowIfNullOrEmpty(storeName, nameof(storeName));
        ArgumentVerifier.ThrowIfNullOrEmpty(key, nameof(key));

        var envelope = new Autogenerated.GetSecretRequest() { StoreName = storeName, Key = key };

        if (metadata != null)
        {
            foreach (var kvp in metadata)
            {
                envelope.Metadata.Add(kvp.Key, kvp.Value);
            }
        }

        var options = CreateCallOptions(headers: null, cancellationToken);
        Autogenerated.GetSecretResponse response;

        try
        {
            response = await client.GetSecretAsync(envelope, options);
        }
        catch (RpcException ex)
        {
            throw new DaprException(
                "Secret operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex);
        }

        return response.Data.ToDictionary(kv => kv.Key, kv => kv.Value);
    }

    /// <inheritdoc/>
    public async override Task<Dictionary<string, Dictionary<string, string>>> GetBulkSecretAsync(
        string storeName,
        IReadOnlyDictionary<string, string> metadata = default,
        CancellationToken cancellationToken = default)
    {
        var envelope = new Autogenerated.GetBulkSecretRequest() { StoreName = storeName };

        if (metadata != null)
        {
            foreach (var kvp in metadata)
            {
                envelope.Metadata.Add(kvp.Key, kvp.Value);
            }
        }

        var options = CreateCallOptions(headers: null, cancellationToken);
        Autogenerated.GetBulkSecretResponse response;

        try
        {
            response = await client.GetBulkSecretAsync(envelope, options);
        }
        catch (RpcException ex)
        {
            throw new DaprException(
                "Bulk secret operation failed: the Dapr endpoint indicated a failure. See InnerException for details.",
                ex);
        }

        return response.Data.ToDictionary(r => r.Key, r => r.Value.Secrets.ToDictionary(s => s.Key, s => s.Value));
    }

    #endregion

    #region Configuration API

    /// <inheritdoc/>
    public async override Task<GetConfigurationResponse> GetConfiguration(
        string storeName,
        IReadOnlyList<string> keys,
        IReadOnlyDictionary<string, string> metadata = default,
        CancellationToken cancellationToken = default)
    {
        ArgumentVerifier.ThrowIfNullOrEmpty(storeName, nameof(storeName));

        var request = new Autogenerated.GetConfigurationRequest() { StoreName = storeName };

        if (keys is { Count: > 0 })
        {
            request.Keys.AddRange(keys);
        }

        if (metadata != null)
        {
            foreach (var kvp in metadata)
            {
                request.Metadata.Add(kvp.Key, kvp.Value);
            }
        }

        var options = CreateCallOptions(headers: null, cancellationToken);
        Autogenerated.GetConfigurationResponse response;
        try
        {
            response = await client.GetConfigurationAsync(request, options);
        }
        catch (RpcException ex)
        {
            throw new DaprException(
                "GetConfiguration operation failed: the Dapr endpoint indicated a failure. See InnerException for details.",
                ex);
        }

        var responseItems = response.Items.ToDictionary(item => item.Key,
            item => new ConfigurationItem(item.Value.Value, item.Value.Version, item.Value.Metadata));

        return new GetConfigurationResponse(responseItems);
    }

    /// <inheritdoc/>
    public override Task<SubscribeConfigurationResponse> SubscribeConfiguration(
        string storeName,
        IReadOnlyList<string> keys,
        IReadOnlyDictionary<string, string> metadata = default,
        CancellationToken cancellationToken = default)
    {
        ArgumentVerifier.ThrowIfNullOrEmpty(storeName, nameof(storeName));

        Autogenerated.SubscribeConfigurationRequest request = new Autogenerated.SubscribeConfigurationRequest()
        {
            StoreName = storeName
        };

        if (keys != null && keys.Count > 0)
        {
            request.Keys.AddRange(keys);
        }

        if (metadata != null)
        {
            foreach (var kvp in metadata)
            {
                request.Metadata.Add(kvp.Key, kvp.Value);
            }
        }

        var options = CreateCallOptions(headers: null, cancellationToken: cancellationToken);
        return Task.FromResult(new SubscribeConfigurationResponse(
            new DaprSubscribeConfigurationSource(client.SubscribeConfiguration(request, options))));
    }

    public override async Task<UnsubscribeConfigurationResponse> UnsubscribeConfiguration(
        string storeName,
        string id,
        CancellationToken cancellationToken = default)
    {
        ArgumentVerifier.ThrowIfNullOrEmpty(storeName, nameof(storeName));
        ArgumentVerifier.ThrowIfNullOrEmpty(id, nameof(id));

        Autogenerated.UnsubscribeConfigurationRequest request =
            new Autogenerated.UnsubscribeConfigurationRequest() { StoreName = storeName, Id = id };

        var options = CreateCallOptions(headers: null, cancellationToken);
        var resp = await client.UnsubscribeConfigurationAsync(request, options);
        return new UnsubscribeConfigurationResponse(resp.Ok, resp.Message);
    }

    #endregion

    #region Cryptography

    /// <inheritdoc />
    [Experimental("DAPR_CRYPTOGRAPHY", UrlFormat = "https://docs.dapr.io/developing-applications/building-blocks/cryptography/cryptography-overview/")]
    public override async Task<ReadOnlyMemory<byte>> EncryptAsync(string vaultResourceName,
        ReadOnlyMemory<byte> plaintextBytes, string keyName, EncryptionOptions encryptionOptions,
        CancellationToken cancellationToken = default)
    {
        using var memoryStream = plaintextBytes.CreateMemoryStream(true);

        var encryptionResult = EncryptAsync(vaultResourceName, memoryStream, keyName, encryptionOptions, cancellationToken);

        var bufferedResult = new ArrayBufferWriter<byte>();
        await foreach (var item in encryptionResult)
        {
            bufferedResult.Write(item.Span);
        }

        return bufferedResult.WrittenMemory;
    }

    /// <inheritdoc />
    [Experimental("DAPR_CRYPTOGRAPHY", UrlFormat = "https://docs.dapr.io/developing-applications/building-blocks/cryptography/cryptography-overview/")]
    public override async IAsyncEnumerable<ReadOnlyMemory<byte>> EncryptAsync(string vaultResourceName,
        Stream plaintextStream,
        string keyName, EncryptionOptions encryptionOptions, 
        [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        ArgumentVerifier.ThrowIfNullOrEmpty(vaultResourceName, nameof(vaultResourceName));
        ArgumentVerifier.ThrowIfNullOrEmpty(keyName, nameof(keyName));
        ArgumentVerifier.ThrowIfNull(plaintextStream, nameof(plaintextStream));
        ArgumentVerifier.ThrowIfNull(encryptionOptions, nameof(encryptionOptions));
        
        EventHandler<Exception> exceptionHandler = (_, ex) => throw ex;

        var shouldOmitDecryptionKeyName =
            string.IsNullOrWhiteSpace(encryptionOptions
                .DecryptionKeyName); //Whitespace isn't likely a valid key name either

        var encryptRequestOptions = new Autogenerated.EncryptRequestOptions
        {
            ComponentName = vaultResourceName,
            DataEncryptionCipher = encryptionOptions.EncryptionCipher.GetValueFromEnumMember(),
            KeyName = keyName,
            KeyWrapAlgorithm = encryptionOptions.KeyWrapAlgorithm.GetValueFromEnumMember(),
            OmitDecryptionKeyName = shouldOmitDecryptionKeyName
        };

        if (!shouldOmitDecryptionKeyName)
        {
            ArgumentVerifier.ThrowIfNullOrEmpty(encryptionOptions.DecryptionKeyName,
                nameof(encryptionOptions.DecryptionKeyName));
            encryptRequestOptions.DecryptionKeyName = encryptRequestOptions.DecryptionKeyName;
        }

        var options = CreateCallOptions(headers: null, cancellationToken);
        var duplexStream = Client.EncryptAlpha1(options);

        using var streamProcessor = new EncryptionStreamProcessor();
        try
        {
            streamProcessor.OnException += exceptionHandler;
            await streamProcessor.ProcessStreamAsync(plaintextStream, duplexStream, encryptRequestOptions,
                encryptionOptions.StreamingBlockSizeInBytes,
                cancellationToken);

            await foreach (var value in streamProcessor.GetProcessedDataAsync(cancellationToken))
            {
                yield return value;
            }
        }
        finally
        {
            streamProcessor.OnException -= exceptionHandler;
        }
    }

    /// <inheritdoc />
    [Experimental("DAPR_CRYPTOGRAPHY", UrlFormat = "https://docs.dapr.io/developing-applications/building-blocks/cryptography/cryptography-overview/")]
    public override async IAsyncEnumerable<ReadOnlyMemory<byte>> DecryptAsync(string vaultResourceName,
        Stream ciphertextStream, string keyName,
        DecryptionOptions decryptionOptions, 
        [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        ArgumentVerifier.ThrowIfNullOrEmpty(vaultResourceName, nameof(vaultResourceName));
        ArgumentVerifier.ThrowIfNullOrEmpty(keyName, nameof(keyName));
        ArgumentVerifier.ThrowIfNull(ciphertextStream, nameof(ciphertextStream));
        decryptionOptions ??= new DecryptionOptions();

        EventHandler<Exception> exceptionHandler = (_, ex) => throw ex;

        var decryptRequestOptions = new Autogenerated.DecryptRequestOptions
        {
            ComponentName = vaultResourceName, 
            KeyName = keyName
        };

        var options = CreateCallOptions(headers: null, cancellationToken);
        var duplexStream = client.DecryptAlpha1(options);

        using var streamProcessor = new DecryptionStreamProcessor();
        try
        {
            streamProcessor.OnException += exceptionHandler;
            await streamProcessor.ProcessStreamAsync(ciphertextStream, duplexStream, decryptionOptions.StreamingBlockSizeInBytes,
                decryptRequestOptions,
                cancellationToken);

            await foreach (var value in streamProcessor.GetProcessedDataAsync(cancellationToken))
            {
                yield return value;
            }
        }
        finally
        {
            streamProcessor.OnException -= exceptionHandler;
        }
    }

    /// <inheritdoc />
    [Experimental("DAPR_CRYPTOGRAPHY", UrlFormat = "https://docs.dapr.io/developing-applications/building-blocks/cryptography/cryptography-overview/")]
    public override IAsyncEnumerable<ReadOnlyMemory<byte>> DecryptAsync(string vaultResourceName,
        Stream ciphertextStream, string keyName, CancellationToken cancellationToken = default) =>
        DecryptAsync(vaultResourceName, ciphertextStream, keyName, new DecryptionOptions(),
            cancellationToken);

    /// <inheritdoc />
    [Experimental("DAPR_CRYPTOGRAPHY", UrlFormat = "https://docs.dapr.io/developing-applications/building-blocks/cryptography/cryptography-overview/")]
    public override async Task<ReadOnlyMemory<byte>> DecryptAsync(string vaultResourceName,
        ReadOnlyMemory<byte> ciphertextBytes, string keyName, DecryptionOptions decryptionOptions,
        CancellationToken cancellationToken = default)
    {
        using var memoryStream = ciphertextBytes.CreateMemoryStream(true);

        var decryptionResult = DecryptAsync(vaultResourceName, memoryStream, keyName, decryptionOptions, cancellationToken);
        
        var bufferedResult = new ArrayBufferWriter<byte>();
        await foreach (var item in decryptionResult)
        {
            bufferedResult.Write(item.Span);
        }

        return bufferedResult.WrittenMemory;
    }

    /// <inheritdoc />
    [Experimental("DAPR_CRYPTOGRAPHY", UrlFormat = "https://docs.dapr.io/developing-applications/building-blocks/cryptography/cryptography-overview/")]
    public override async Task<ReadOnlyMemory<byte>> DecryptAsync(string vaultResourceName,
        ReadOnlyMemory<byte> ciphertextBytes, string keyName, CancellationToken cancellationToken = default) =>
        await DecryptAsync(vaultResourceName, ciphertextBytes, keyName,
            new DecryptionOptions(), cancellationToken);

    #region Subtle Crypto Implementation

    ///// <inheritdoc/>
    //[Experimental("DAPR_CRYPTOGRAPHY", UrlFormat = "https://docs.dapr.io/developing-applications/building-blocks/cryptography/cryptography-overview/")]
    //public override async Task<(string Name, string PublicKey)> GetKeyAsync(string vaultResourceName, string keyName, Autogenerated.SubtleGetKeyRequest.Types.KeyFormat keyFormat,
    //    CancellationToken cancellationToken = default)
    //{
    //    ArgumentVerifier.ThrowIfNullOrEmpty(vaultResourceName, nameof(vaultResourceName));
    //    ArgumentVerifier.ThrowIfNullOrEmpty(keyName, nameof(keyName));

    //    var envelope = new Autogenerated.SubtleGetKeyRequest()
    //    {
    //        ComponentName = vaultResourceName, Format = keyFormat, Name = keyName
    //    };

    //    var options = CreateCallOptions(headers: null, cancellationToken);
    //    Autogenerated.SubtleGetKeyResponse response;

    //    try
    //    {
    //        response = await client.SubtleGetKeyAlpha1Async(envelope, options);
    //    }
    //    catch (RpcException ex)
    //    {
    //        throw new DaprException(
    //            "Cryptography operation failed: the Dapr endpoint indicated a failure. See InnerException for details", ex);
    //    }

    //    return (response.Name, response.PublicKey);
    //}

    ///// <inheritdoc/>
    //[Experimental("DAPR_CRYPTOGRAPHY", UrlFormat = "https://docs.dapr.io/developing-applications/building-blocks/cryptography/cryptography-overview/")]
    //public override async Task<(byte[] CipherTextBytes, byte[] AuthenticationTag)> EncryptAsync(string vaultResourceName, byte[] plainTextBytes, string algorithm,
    //    string keyName, byte[] nonce, byte[] associatedData, CancellationToken cancellationToken = default)
    //{
    //    ArgumentVerifier.ThrowIfNullOrEmpty(vaultResourceName, nameof(vaultResourceName));
    //    ArgumentVerifier.ThrowIfNullOrEmpty(algorithm, nameof(algorithm));
    //    ArgumentVerifier.ThrowIfNullOrEmpty(keyName, nameof(keyName));

    //    var envelope = new Autogenerated.SubtleEncryptRequest
    //    {
    //        ComponentName = vaultResourceName,
    //        Algorithm = algorithm,
    //        KeyName = keyName,
    //        Nonce = ByteString.CopyFrom(nonce),
    //        Plaintext = ByteString.CopyFrom(plainTextBytes),
    //        AssociatedData = ByteString.CopyFrom(associatedData)
    //    };

    //    var options = CreateCallOptions(headers: null, cancellationToken);
    //    Autogenerated.SubtleEncryptResponse response;

    //    try
    //    {
    //        response = await client.SubtleEncryptAlpha1Async(envelope, options);
    //    }
    //    catch (RpcException ex)
    //    {
    //        throw new DaprException(
    //            "Cryptography operation failed: the Dapr endpoint indicated a failure. See InnerException for details",
    //            ex);
    //    }

    //    return (response.Ciphertext.ToByteArray(), response.Tag.ToByteArray() ?? Array.Empty<byte>());
    //}

    ///// <inheritdoc/>
    //[Experimental("DAPR_CRYPTOGRAPHY", UrlFormat = "https://docs.dapr.io/developing-applications/building-blocks/cryptography/cryptography-overview/")]
    //public override async Task<byte[]> DecryptAsync(string vaultResourceName, byte[] cipherTextBytes, string algorithm, string keyName, byte[] nonce, byte[] tag,
    //    byte[] associatedData, CancellationToken cancellationToken = default)
    //{
    //    ArgumentVerifier.ThrowIfNullOrEmpty(vaultResourceName, nameof(vaultResourceName));
    //    ArgumentVerifier.ThrowIfNullOrEmpty(algorithm, nameof(algorithm));
    //    ArgumentVerifier.ThrowIfNullOrEmpty(keyName, nameof(keyName));

    //    var envelope = new Autogenerated.SubtleDecryptRequest
    //    {
    //        ComponentName = vaultResourceName,
    //        Algorithm = algorithm,
    //        KeyName = keyName,
    //        Nonce = ByteString.CopyFrom(nonce),
    //        Ciphertext = ByteString.CopyFrom(cipherTextBytes),
    //        AssociatedData = ByteString.CopyFrom(associatedData),
    //        Tag = ByteString.CopyFrom(tag)
    //    };

    //    var options = CreateCallOptions(headers: null, cancellationToken);
    //    Autogenerated.SubtleDecryptResponse response;

    //    try
    //    {
    //        response = await client.SubtleDecryptAlpha1Async(envelope, options);
    //    }
    //    catch (RpcException ex)
    //    {
    //        throw new DaprException(
    //            "Cryptography operation failed: the Dapr endpoint included a failure. See InnerException for details", ex);
    //    }

    //    return response.Plaintext.ToByteArray();
    //}

    ///// <inheritdoc/>
    //[Experimental("DAPR_CRYPTOGRAPHY", UrlFormat = "https://docs.dapr.io/developing-applications/building-blocks/cryptography/cryptography-overview/")]
    //public override async Task<(byte[] WrappedKey, byte[] AuthenticationTag)> WrapKeyAsync(string vaultResourceName, byte[] plainTextKey, string keyName,
    //    string algorithm, byte[] nonce, byte[] associatedData, CancellationToken cancellationToken = default)
    //{
    //    ArgumentVerifier.ThrowIfNullOrEmpty(vaultResourceName, nameof(vaultResourceName));
    //    ArgumentVerifier.ThrowIfNullOrEmpty(keyName, nameof(keyName));
    //    ArgumentVerifier.ThrowIfNullOrEmpty(algorithm, nameof(algorithm));

    //    var envelope = new Autogenerated.SubtleWrapKeyRequest
    //    {
    //        ComponentName = vaultResourceName,
    //        Algorithm = algorithm,
    //        KeyName = keyName,
    //        Nonce = ByteString.CopyFrom(nonce),
    //        PlaintextKey = ByteString.CopyFrom(plainTextKey),
    //        AssociatedData = ByteString.CopyFrom(associatedData)
    //    };

    //    var options = CreateCallOptions(headers: null, cancellationToken);
    //    Autogenerated.SubtleWrapKeyResponse response;

    //    try
    //    {
    //        response = await client.SubtleWrapKeyAlpha1Async(envelope, options);
    //    }
    //    catch (RpcException ex)
    //    {
    //        throw new DaprException(
    //            "Cryptography operation failed: the Dapr endpoint included a failure. See InnerException for details",
    //            ex);
    //    }

    //    return (response.WrappedKey.ToByteArray(), response.Tag.ToByteArray() ?? Array.Empty<byte>());
    //}

    ///// <inheritdoc/>
    //[Experimental("DAPR_CRYPTOGRAPHY", UrlFormat = "https://docs.dapr.io/developing-applications/building-blocks/cryptography/cryptography-overview/")]
    //public override async Task<byte[]> UnwrapKeyAsync(string vaultResourceName, byte[] wrappedKey, string algorithm,
    //    string keyName, byte[] nonce, byte[] tag, byte[] associatedData, CancellationToken cancellationToken = default)
    //{
    //    ArgumentVerifier.ThrowIfNullOrEmpty(vaultResourceName, nameof(vaultResourceName));
    //    ArgumentVerifier.ThrowIfNullOrEmpty(algorithm, nameof(algorithm));
    //    ArgumentVerifier.ThrowIfNullOrEmpty(keyName, nameof(keyName));

    //    var envelope = new Autogenerated.SubtleUnwrapKeyRequest
    //    {
    //        ComponentName = vaultResourceName,
    //        WrappedKey = ByteString.CopyFrom(wrappedKey),
    //        AssociatedData = ByteString.CopyFrom(associatedData),
    //        Algorithm = algorithm,
    //        KeyName = keyName,
    //        Nonce = ByteString.CopyFrom(nonce),
    //        Tag = ByteString.CopyFrom(tag)
    //    };

    //    var options = CreateCallOptions(headers: null, cancellationToken);
    //    Autogenerated.SubtleUnwrapKeyResponse response;

    //    try
    //    {
    //        response = await client.SubtleUnwrapKeyAlpha1Async(envelope, options);
    //    }
    //    catch (RpcException ex)
    //    {
    //        throw new DaprException(
    //            "Cryptography operation failed: the Dapr endpoint included a failure. See InnerException for details",
    //            ex);
    //    }

    //    return response.PlaintextKey.ToByteArray();
    //}

    ///// <inheritdoc/>
    //[Experimental("DAPR_CRYPTOGRAPHY", UrlFormat = "https://docs.dapr.io/developing-applications/building-blocks/cryptography/cryptography-overview/")]
    //public override async Task<byte[]> SignAsync(string vaultResourceName, byte[] digest, string algorithm, string keyName, CancellationToken cancellationToken = default)
    //{
    //    ArgumentVerifier.ThrowIfNullOrEmpty(vaultResourceName, nameof(vaultResourceName));
    //    ArgumentVerifier.ThrowIfNullOrEmpty(algorithm, nameof(algorithm));
    //    ArgumentVerifier.ThrowIfNullOrEmpty(keyName, nameof(keyName));

    //    var envelope = new Autogenerated.SubtleSignRequest
    //    {
    //        ComponentName = vaultResourceName,
    //        Digest = ByteString.CopyFrom(digest),
    //        Algorithm = algorithm,
    //        KeyName = keyName
    //    };

    //    var options = CreateCallOptions(headers: null, cancellationToken);
    //    Autogenerated.SubtleSignResponse response;

    //    try
    //    {
    //        response = await client.SubtleSignAlpha1Async(envelope, options);
    //    }
    //    catch (RpcException ex)
    //    {
    //        throw new DaprException(
    //            "Cryptography operation failed: the Dapr endpoint included a failure. See InnerException for details",
    //            ex);
    //    }

    //    return response.Signature.ToByteArray();
    //}

    ///// <inheritdoc/>
    //[Experimental("DAPR_CRYPTOGRAPHY", UrlFormat = "https://docs.dapr.io/developing-applications/building-blocks/cryptography/cryptography-overview/")]
    //public override async Task<bool> VerifyAsync(string vaultResourceName, byte[] digest, byte[] signature,
    //    string algorithm, string keyName, CancellationToken cancellationToken = default)
    //{
    //    ArgumentVerifier.ThrowIfNullOrEmpty(vaultResourceName, nameof(vaultResourceName));
    //    ArgumentVerifier.ThrowIfNullOrEmpty(algorithm, nameof(algorithm));
    //    ArgumentVerifier.ThrowIfNullOrEmpty(keyName, nameof(keyName));

    //    var envelope = new Autogenerated.SubtleVerifyRequest
    //    {
    //        ComponentName = vaultResourceName,
    //        Algorithm = algorithm,
    //        KeyName = keyName,
    //        Signature = ByteString.CopyFrom(signature),
    //        Digest = ByteString.CopyFrom(digest)
    //    };

    //    var options = CreateCallOptions(headers: null, cancellationToken);
    //    Autogenerated.SubtleVerifyResponse response;

    //    try
    //    {
    //        response = await client.SubtleVerifyAlpha1Async(envelope, options);
    //    }
    //    catch (RpcException ex)
    //    {
    //        throw new DaprException(
    //            "Cryptography operation failed: the Dapr endpoint included a failure. See InnerException for details",
    //            ex);
    //    }

    //    return response.Valid;
    //}

    #endregion


    #endregion

    #region Distributed Lock API

    /// <inheritdoc/>
    [Experimental("DAPR_DISTRIBUTEDLOCK", UrlFormat = "https://docs.dapr.io/developing-applications/building-blocks/distributed-lock/distributed-lock-api-overview/")]
    public async override Task<TryLockResponse> Lock(
        string storeName,
        string resourceId,
        string lockOwner,
        Int32 expiryInSeconds,
        CancellationToken cancellationToken = default)
    {
        ArgumentVerifier.ThrowIfNullOrEmpty(storeName, nameof(storeName));
        ArgumentVerifier.ThrowIfNullOrEmpty(resourceId, nameof(resourceId));
        ArgumentVerifier.ThrowIfNullOrEmpty(lockOwner, nameof(lockOwner));

        if (expiryInSeconds == 0 || expiryInSeconds < 0)
        {
            throw new ArgumentException("The value cannot be zero or less than zero: " + expiryInSeconds);
        }

        var request = new Autogenerated.TryLockRequest()
        {
            StoreName = storeName, ResourceId = resourceId, LockOwner = lockOwner, ExpiryInSeconds = expiryInSeconds
        };

        try
        {
            var options = CreateCallOptions(headers: null, cancellationToken);

            var response = await client.TryLockAlpha1Async(request, options);
            return new TryLockResponse()
            {
                StoreName = storeName, ResourceId = resourceId, LockOwner = lockOwner, Success = response.Success
            };
        }
        catch (RpcException ex)
        {
            throw new DaprException(
                "Lock operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex);
        }
    }

    /// <inheritdoc/>
    [Experimental("DAPR_DISTRIBUTEDLOCK", UrlFormat = "https://docs.dapr.io/developing-applications/building-blocks/distributed-lock/distributed-lock-api-overview/")]
    public async override Task<UnlockResponse> Unlock(
        string storeName,
        string resourceId,
        string lockOwner,
        CancellationToken cancellationToken = default)
    {
        ArgumentVerifier.ThrowIfNullOrEmpty(storeName, nameof(storeName));
        ArgumentVerifier.ThrowIfNullOrEmpty(resourceId, nameof(resourceId));
        ArgumentVerifier.ThrowIfNullOrEmpty(lockOwner, nameof(lockOwner));

        var request = new Autogenerated.UnlockRequest()
        {
            StoreName = storeName, ResourceId = resourceId, LockOwner = lockOwner
        };

        var options = CreateCallOptions(headers: null, cancellationToken);
        Autogenerated.UnlockResponse response;
        try
        {
            response = await client.UnlockAlpha1Async(request, options);
        }
        catch (RpcException ex)
        {
            throw new DaprException(
                "Lock operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex);
        }

        return new UnlockResponse(GetUnLockStatus(response.Status));
    }

    #endregion

    #region Dapr Sidecar Methods

    /// <inheritdoc/>
    public override async Task<bool> CheckHealthAsync(CancellationToken cancellationToken = default)
    {
        var path = "/v1.0/healthz";
        var request = new HttpRequestMessage(HttpMethod.Get, new Uri(this.httpEndpoint, path));

        if (this.apiTokenHeader is not null)
        {
            request.Headers.Add(this.apiTokenHeader.Value.Key, this.apiTokenHeader.Value.Value);
        }

        try
        {
            using var response =
                await this.httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken);
            return response.IsSuccessStatusCode;
        }
        catch (HttpRequestException)
        {
            return false;
        }
    }

    /// <inheritdoc/>
    public override async Task<bool> CheckOutboundHealthAsync(CancellationToken cancellationToken = default)
    {
        const string path = "/v1.0/healthz/outbound";
        var request = new HttpRequestMessage(HttpMethod.Get, new Uri(this.httpEndpoint, path));

        if (this.apiTokenHeader is not null)
        {
            request.Headers.Add(this.apiTokenHeader.Value.Key, this.apiTokenHeader.Value.Value);
        }

        try
        {
            using var response =
                await this.httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken);
            return response.IsSuccessStatusCode;
        }
        catch (HttpRequestException)
        {
            return false;
        }
    }

    /// <inheritdoc/>
    public override async Task WaitForSidecarAsync(CancellationToken cancellationToken = default)
    {
        while (true)
        {
            var response = await CheckOutboundHealthAsync(cancellationToken);
            if (response)
            {
                break;
            }

            await Task.Delay(TimeSpan.FromMilliseconds(500), cancellationToken);
        }
    }

    /// <inheritdoc/>
    public async override Task ShutdownSidecarAsync(CancellationToken cancellationToken = default)
    {
        await client.ShutdownAsync(new Autogenerated.ShutdownRequest(), CreateCallOptions(null, cancellationToken));
    }

    /// <inheritdoc/>
    public override async Task<DaprMetadata> GetMetadataAsync(CancellationToken cancellationToken = default)
    {
        var options = CreateCallOptions(headers: null, cancellationToken);
        try
        {
            var response = await client.GetMetadataAsync(new Autogenerated.GetMetadataRequest(), options);
            return new DaprMetadata(response.Id ?? "",
                response.ActorRuntime?.ActiveActors?.Select(c => new DaprActorMetadata(c.Type, c.Count)).ToList() ??
                new List<DaprActorMetadata>(),
                response.ExtendedMetadata?.ToDictionary(c => c.Key, c => c.Value) ??
                new Dictionary<string, string>(),
                response.RegisteredComponents?.Select(c =>
                    new DaprComponentsMetadata(c.Name, c.Type, c.Version, c.Capabilities.ToArray())).ToList() ??
                new List<DaprComponentsMetadata>());
        }
        catch (RpcException ex)
        {
            throw new DaprException(
                "Get metadata operation failed: the Dapr endpoint indicated a failure. See InnerException for details.",
                ex);
        }
    }

    /// <inheritdoc/>
    public override async Task SetMetadataAsync(string attributeName, string attributeValue,
        CancellationToken cancellationToken = default)
    {
        ArgumentVerifier.ThrowIfNullOrEmpty(attributeName, nameof(attributeName));

        var envelope = new Autogenerated.SetMetadataRequest() { Key = attributeName, Value = attributeValue };

        var options = CreateCallOptions(headers: null, cancellationToken);

        try
        {
            _ = await this.Client.SetMetadataAsync(envelope, options);
        }
        catch (RpcException ex)
        {
            throw new DaprException(
                "Set metadata operation failed: the Dapr endpoint indicated a failure. See InnerException for details.",
                ex);
        }
    }

    #endregion

    protected override void Dispose(bool disposing)
    {
        if (disposing)
        {
            this.channel.Dispose();
            this.httpClient.Dispose();
        }
    }

    #region Helper Methods

    private CallOptions CreateCallOptions(Metadata headers, CancellationToken cancellationToken)
    {
        var options = new CallOptions(headers: headers ?? new Metadata(), cancellationToken: cancellationToken);

        if (options.Headers != null)
        {
            options.Headers.Add("User-Agent", UserAgent().ToString());

            // add token for dapr api token based authentication
            if (this.apiTokenHeader is not null)
            {
                options.Headers.Add(this.apiTokenHeader.Value.Key, this.apiTokenHeader.Value.Value);
            }
        }

        return options;
    }

    /// <summary>
    /// Makes Grpc call using the cancellationToken and handles Errors.
    /// All common exception handling logic will reside here.
    /// </summary>
    /// <typeparam name="TResponse"></typeparam>
    /// <param name="callFunc"></param>
    /// <param name="cancellationToken"></param>
    /// <returns></returns>
    private async Task<TResponse> MakeGrpcCallHandleError<TResponse>(
        Func<CallOptions, AsyncUnaryCall<TResponse>> callFunc, CancellationToken cancellationToken = default)
    {
        var callOptions = CreateCallOptions(headers: null, cancellationToken);
        return await callFunc.Invoke(callOptions);
    }

    private Autogenerated.StateOptions ToAutoGeneratedStateOptions(StateOptions stateOptions)
    {
        var stateRequestOptions = new Autogenerated.StateOptions();

        if (stateOptions.Consistency != null)
        {
            stateRequestOptions.Consistency = GetStateConsistencyForConsistencyMode(stateOptions.Consistency.Value);
        }

        if (stateOptions.Concurrency != null)
        {
            stateRequestOptions.Concurrency = GetStateConcurrencyForConcurrencyMode(stateOptions.Concurrency.Value);
        }

        return stateRequestOptions;
    }

    private static Autogenerated.StateOptions.Types.StateConsistency GetStateConsistencyForConsistencyMode(
        ConsistencyMode consistencyMode)
    {
        return consistencyMode switch
        {
            ConsistencyMode.Eventual => Autogenerated.StateOptions.Types.StateConsistency.ConsistencyEventual,
            ConsistencyMode.Strong => Autogenerated.StateOptions.Types.StateConsistency.ConsistencyStrong,
            _ => throw new ArgumentException($"{consistencyMode} Consistency Mode is not supported.")
        };
    }

    private static Autogenerated.StateOptions.Types.StateConcurrency GetStateConcurrencyForConcurrencyMode(
        ConcurrencyMode concurrencyMode)
    {
        return concurrencyMode switch
        {
            ConcurrencyMode.FirstWrite => Autogenerated.StateOptions.Types.StateConcurrency.ConcurrencyFirstWrite,
            ConcurrencyMode.LastWrite => Autogenerated.StateOptions.Types.StateConcurrency.ConcurrencyLastWrite,
            _ => throw new ArgumentException($"{concurrencyMode} Concurrency Mode is not supported.")
        };
    }

    private static LockStatus GetUnLockStatus(Autogenerated.UnlockResponse.Types.Status status)
    {
        return status switch
        {
            Autogenerated.UnlockResponse.Types.Status.Success => LockStatus.Success,
            Autogenerated.UnlockResponse.Types.Status.LockDoesNotExist => LockStatus.LockDoesNotExist,
            Autogenerated.UnlockResponse.Types.Status.LockBelongsToOthers => LockStatus.LockBelongsToOthers,
            Autogenerated.UnlockResponse.Types.Status.InternalError => LockStatus.InternalError,
            _ => throw new ArgumentException($"{status} Status is not supported.")
        };
    }

    #endregion Helper Methods
}
